Let us set some global options for all code chunks in this document.

knitr::opts_chunk$set(
  message = FALSE,    # Disable messages printed by R code chunks
  warning = FALSE,    # Disable warnings printed by R code chunks
  echo = TRUE,        # Show R code within code chunks in output
  include = TRUE,     # Include both R code and its results in output
  eval = TRUE,       # Evaluate R code chunks
  cache = FALSE,       # Enable caching of R code chunks for faster rendering
  fig.align = "center",
  out.width = "100%",
  retina = 2,
  error = TRUE,
  collapse = TRUE
)
rm(list = ls())
set.seed(1982)
{python}
import os
import time
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torchdiffeq import odeint
import matplotlib.pyplot as plt

import matplotlib
matplotlib.rcParams['figure.dpi'] = 400
# Set your desired parameters here
method = 'dopri5'
data_size = 1000
batch_time = 10
batch_size = 20
niters = 2000 #2000
test_freq = 100#20
viz = True
gpu = 0
adjoint = False

device = torch.device('cuda:' + str(gpu) if torch.cuda.is_available() else 'cpu')

true_y0 = torch.tensor([[2., 0.]]).to(device)
t = torch.linspace(0., 25., data_size).to(device)
true_A = torch.tensor([[-0.1, 2.0], [-2.0, -0.1]]).to(device)


class Lambda(nn.Module):

    def forward(self, t, y):
        return torch.mm(y**3, true_A)


with torch.no_grad():
    true_y = odeint(Lambda(), true_y0, t, method='dopri5')


def get_batch():
    s = torch.from_numpy(np.random.choice(np.arange(data_size - batch_time, dtype=np.int64), batch_size, replace=False))
    batch_y0 = true_y[s]  # (M, D)
    batch_t = t[:batch_time]  # (T)
    batch_y = torch.stack([true_y[s + i] for i in range(batch_time)], dim=0)  # (T, M, D)
    return batch_y0.to(device), batch_t.to(device), batch_y.to(device)


def visualize(true_y, pred_y, odefunc, ii, itr):
    if viz:
        # Create the directory if it doesn't exist
        if not os.path.exists('png'):
            os.makedirs('png')

        fig, (ax_traj, ax_phase) = plt.subplots(1, 2, figsize=(10, 5))

        ax_traj.set_title(f"Trajectories")
        ax_traj.set_xlabel('$t$')
        ax_traj.set_ylabel('$x(t),y(t)$')
        ax_traj.plot(t.cpu().numpy(), true_y.cpu().numpy()[:, 0, 0], t.cpu().numpy(), true_y.cpu().numpy()[:, 0, 1], 'g-')
        ax_traj.plot(t.cpu().numpy(), pred_y.cpu().numpy()[:, 0, 0], '--', t.cpu().numpy(), pred_y.cpu().numpy()[:, 0, 1], 'b--')
        ax_traj.set_xlim(t.cpu().min(), t.cpu().max())
        ax_traj.set_ylim(-2, 2)
        ax_traj.legend(['True x', 'True y', 'Predicted x', 'Predicted y'])

        ax_phase.set_title('Iter {:04d} | Total Loss {:.6f}'.format(itr, loss.item()))
        ax_phase.set_xlabel('$x(t)$')
        ax_phase.set_ylabel('$y(t)$')
        ax_phase.plot(true_y.cpu().numpy()[:, 0, 0], true_y.cpu().numpy()[:, 0, 1], 'k-')
        ax_phase.plot(pred_y.cpu().numpy()[:, 0, 0], pred_y.cpu().numpy()[:, 0, 1], 'r--')
        ax_phase.set_xlim(-2, 2)
        ax_phase.set_ylim(-2, 2)
        ax_phase.legend(['True', 'Predicted'])

        plt.tight_layout()
        plt.savefig('png/{:03d}.png'.format(ii))
        plt.show()


class ODEFunc(nn.Module):

    def __init__(self):
        super(ODEFunc, self).__init__()

        self.net = nn.Sequential(
            nn.Linear(2, 50),
            nn.Tanh(),
            nn.Linear(50, 2),
        )

        for m in self.net.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, mean=0, std=0.1)
                nn.init.constant_(m.bias, val=0)

    def forward(self, t, y):
        return self.net(y**3)


class RunningAverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self, momentum=0.99):
        self.momentum = momentum
        self.reset()

    def reset(self):
        self.val = None
        self.avg = 0

    def update(self, val):
        if self.val is None:
            self.avg = val
        else:
            self.avg = self.avg * self.momentum + val * (1 - self.momentum)
        self.val = val


ii = 0

func = ODEFunc().to(device)

optimizer = optim.RMSprop(func.parameters(), lr=1e-3)
end = time.time()

time_meter = RunningAverageMeter(0.97)

loss_meter = RunningAverageMeter(0.97)

for itr in range(1, niters + 1):
    optimizer.zero_grad()
    batch_y0, batch_t, batch_y = get_batch()
    pred_y = odeint(func, batch_y0, batch_t, method = 'euler').to(device)
    loss = torch.mean(torch.abs(pred_y - batch_y))
    loss.backward()
    optimizer.step()

    time_meter.update(time.time() - end)
    loss_meter.update(loss.item())

    if itr % test_freq == 0:
        with torch.no_grad():
            pred_y = odeint(func, true_y0, t, method = 'euler')
            loss = torch.mean(torch.abs(pred_y - true_y))
            print('Iter {:04d} | Total Loss {:.6f}'.format(itr, loss.item()))
            visualize(true_y, pred_y, func, ii, itr)
            ii += 1

    end = time.time()
{python}
import os
import imageio

def create_gif_from_folder(folder_path, output_path, duration=1):
    images = []
    
    # Read images from the folder
    for filename in sorted(os.listdir(folder_path)):
        if filename.endswith('.png'):
            images.append(imageio.imread(os.path.join(folder_path, filename)))
    
    # Create GIF
    imageio.mimsave(output_path, images, duration=duration)

# Example usage
folder_path = "~/Desktop/Spring 2024/SFDA/Vignettes/png"
output_path = "~/Desktop/Spring 2024/SFDA/Vignettes/png/output.gif"
folder_path = os.path.expanduser(folder_path)
output_path = os.path.expanduser(output_path)
create_gif_from_folder(folder_path, output_path)
knitr::include_graphics("~/Desktop/Spring 2024/SFDA/Vignettes/png/output.gif")